Example flow for processing and aggregating stats about committee meeting attendees and protocol parts

See the DataFlows documentation for more details regarding the Flow object and processing functions.

Feel free to modify and commit changes which demonstrate additional functionality or relevant data.

Constants


In [1]:
# Limit processing of protocol parts for development
PROCESS_PARTS_LIMIT = 500

# Enable caching of protocol parts data (not efficient, should only be used for local development with sensible PROCESS_PARTS_LIMIT)
PROCESS_PARTS_CACHE = True

# Filter the meetings to be processed, these kwargs are passed along to DataFlows filter_rows processor for meetings resource
MEETINGS_FILTER_ROWS_KWARGS = {'equals': [{'KnessetNum': 20}]}

# Don'e use local data - loads everything from knesset data remote storage
# When set to False - also enables caching, so you won't download from remote storage on 2nd run.
USE_DATA = False

Load source data


In [2]:
from dataflows import filter_rows, cache
from datapackage_pipelines_knesset.common_flow import load_knesset_data, load_member_names

# Loads a dict containing mapping between knesset member id and the member name
member_names = load_member_names(use_data=USE_DATA)

# define flow steps for loading the source committee meetings data
# the actual loading is done later in the Flow
load_steps = (
    load_knesset_data('people/committees/meeting-attendees/datapackage.json', USE_DATA),
    filter_rows(**MEETINGS_FILTER_ROWS_KWARGS)
)

if not USE_DATA:
    # when loading from URL - enable caching which will skip loading on 2nd run
    load_steps = (cache(*load_steps, cache_path='.cache/people-committee-meeting-attendees-knesset-20'),)


DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
loading from url: https://storage.googleapis.com/knesset-data-pipelines/data/members/mk_individual/datapackage.json
loading fresh data, saving cache to: .cache/members-mk-individual-names
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/members/mk_individual/datapackage.json HTTP/1.1" 200 14559
DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/members/mk_individual/datapackage.json HTTP/1.1" 200 14559
DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_names.csv HTTP/1.1" 200 48370
DEBUG   :Starting new HTTPS connection (2): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/members/mk_individual/mk_individual_names.csv HTTP/1.1" 200 48370
loading from url: https://storage.googleapis.com/knesset-data-pipelines/data/people/committees/meeting-attendees/datapackage.json

Inspect the datapackages which will be loaded

Last command's output log should contain urls to datapackage.json files, open them and check the table schema to see the resource metadata and available fields which you can use in the processing functions.

Check the frictionlessdata docs for more details about the datapackage file format.

Main processing functions


In [3]:
from collections import defaultdict
from dataflows import Flow

stats = defaultdict(int)
member_attended_meetings = defaultdict(int)

def process_meeting_protocol_part(row):
    stats['processed parts'] += 1
    if row['body'] and 'אנחנו ככנסת צריכים להיות ערוכים' in row['body']:
        stats['meetings contain text: we as knesset need to be prepared'] += 1

def process_meeting(row):
    stats['total meetings'] += 1
    if row['attended_mk_individual_ids']:
        for mk_id in row['attended_mk_individual_ids']:
            member_attended_meetings[mk_id] += 1
    parts_filename = row['parts_parsed_filename']
    if parts_filename:
        if PROCESS_PARTS_LIMIT and stats['processed parts'] < PROCESS_PARTS_LIMIT:
            steps = (load_knesset_data('committees/meeting_protocols_parts/' + parts_filename, USE_DATA),)
            if not USE_DATA and PROCESS_PARTS_CACHE:
                steps = (cache(*steps, cache_path='.cache/committee-meeting-protocol-parts/' + parts_filename),)
            steps += (process_meeting_protocol_part,)
            Flow(*steps).process()

process_steps = (process_meeting,)

Run the flow


In [ ]:
from dataflows import Flow, dump_to_path

Flow(*load_steps, *process_steps, dump_to_path('data/committee-meeting-attendees-parts')).process()


DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
loading fresh data, saving cache to: .cache/people-committee-meeting-attendees-knesset-20
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/people/committees/meeting-attendees/datapackage.json HTTP/1.1" 200 4333
DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/people/committees/meeting-attendees/datapackage.json HTTP/1.1" 200 4333
DEBUG   :Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/people/committees/meeting-attendees/kns_committeesession.csv HTTP/1.1" 200 109881244
DEBUG   :Starting new HTTPS connection (2): storage.googleapis.com:443
DEBUG   :https://storage.googleapis.com:443 "GET /knesset-data-pipelines/data/people/committees/meeting-attendees/kns_committeesession.csv HTTP/1.1" 200 109881244

Aggregate and print stats


In [5]:
from collections import deque
import yaml

top_attended_member_names = [member_names[mk_id] for mk_id, num_attended in
                             deque(sorted(member_attended_meetings.items(), key=lambda kv: kv[1]), maxlen=5)]
print('\n')
print('-- top attended members --')
print(top_attended_member_names)
print('\n')
print('-- stats --')
print(yaml.dump(dict(stats), default_flow_style=False, allow_unicode=True))



-- top attended members --
['איתן ברושי', 'מיכאל לוי', 'דוב חנין', 'משה גפני', 'אורי מקלב']


-- stats --
processed parts: 624
total meetings: 9402

Get output data

Output data is available in the left sidebar under data directory, you can check the datapackage.json and created csv file to explore the data and schema.